/* omhttp.c
 * This is an http output module based on omelasticsearch
 *
 * NOTE: read comments in module-template.h for more specifics!
 *
 * Copyright 2011 Nathan Scott.
 * Copyright 2009-2018 Rainer Gerhards and Adiscon GmbH.
 * Copyright 2018 Christian Tramnitz
 *
 * This file is part of rsyslog.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *       http://www.apache.org/licenses/LICENSE-2.0
 *       -or-
 *       see COPYING.ASL20 in the source distribution
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
#include "config.h"
#include "rsyslog.h"
#include <stdio.h>
#include <stdarg.h>
#include <stdlib.h>
#include <memory.h>
#include <string.h>
#include <curl/curl.h>
#include <curl/easy.h>
#include <assert.h>
#include <signal.h>
#include <errno.h>
#include <time.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#if defined(__FreeBSD__)
#include <unistd.h>
#endif
#include <json.h>
#include <zlib.h>
#include "conf.h"
#include "syslogd-types.h"
#include "srUtils.h"
#include "template.h"
#include "module-template.h"
#include "errmsg.h"
#include "cfsysline.h"
#include "unicode-helper.h"
#include "obj-types.h"
#include "ratelimit.h"
#include "ruleset.h"
#include "statsobj.h"

#ifndef O_LARGEFILE
#  define O_LARGEFILE 0
#endif

MODULE_TYPE_OUTPUT
MODULE_TYPE_NOKEEP
MODULE_CNFNAME("omhttp")

/* internal structures */
DEF_OMOD_STATIC_DATA
DEFobjCurrIf(prop)
DEFobjCurrIf(ruleset)
DEFobjCurrIf(statsobj)

statsobj_t *httpStats;
STATSCOUNTER_DEF(ctrMessagesSubmitted, mutCtrMessagesSubmitted); // Number of message submitted to module
STATSCOUNTER_DEF(ctrMessagesSuccess, mutCtrMessagesSuccess); // Number of messages successfully sent
STATSCOUNTER_DEF(ctrMessagesFail, mutCtrMessagesFail); // Number of messages that failed to send
STATSCOUNTER_DEF(ctrMessagesRetry, mutCtrMessagesRetry); // Number of messages requeued for retry
STATSCOUNTER_DEF(ctrHttpRequestCount, mutCtrHttpRequestCount); // Number of attempted HTTP requests
STATSCOUNTER_DEF(ctrHttpRequestSuccess, mutCtrHttpRequestSuccess); // Number of successful HTTP requests
STATSCOUNTER_DEF(ctrHttpRequestFail, mutCtrHttpRequestFail); // Number of failed HTTP req, 4XX+ are NOT failures
STATSCOUNTER_DEF(ctrHttpStatusSuccess, mutCtrHttpStatusSuccess); // Number of requests returning 1XX/2XX status
STATSCOUNTER_DEF(ctrHttpStatusFail, mutCtrHttpStatusFail); // Number of requests returning 300+ status

static prop_t *pInputName = NULL;

#define WRKR_DATA_TYPE_ES 0xBADF0001

#define HTTP_HEADER_CONTENT_JSON "Content-Type: application/json; charset=utf-8"
#define HTTP_HEADER_CONTENT_TEXT "Content-Type: text/plain"
#define HTTP_HEADER_CONTENT_KAFKA "Content-Type: application/vnd.kafka.v1+json"
#define HTTP_HEADER_ENCODING_GZIP "Content-Encoding: gzip"
#define HTTP_HEADER_EXPECT_EMPTY "Expect:"

#define VALID_BATCH_FORMATS "newline jsonarray kafkarest lokirest"
typedef enum batchFormat_e {
	FMT_NEWLINE,
	FMT_JSONARRAY,
	FMT_KAFKAREST,
	FMT_LOKIREST
} batchFormat_t;

/* REST API uses this URL:
 * https://<hostName>:<restPort>/restPath
*/
typedef struct curl_slist HEADER;
typedef struct instanceConf_s {
	int defaultPort;
	int fdErrFile;		/* error file fd or -1 if not open */
	pthread_mutex_t mutErrFile;
	uchar **serverBaseUrls;
	int numServers;
	long healthCheckTimeout;
	uchar *uid;
	uchar *pwd;
	uchar *authBuf;
	uchar *httpcontenttype;
	uchar *headerContentTypeBuf;
	uchar *httpheaderkey;
	uchar *httpheadervalue;
	uchar *headerBuf;
	uchar **httpHeaders;
	int nHttpHeaders;
	uchar *restPath;
	uchar *checkPath;
	uchar *tplName;
	uchar *errorFile;
	sbool batchMode;
	uchar *batchFormatName;
	batchFormat_t batchFormat;
	sbool bFreeBatchFormatName;
	sbool dynRestPath;
	size_t maxBatchBytes;
	size_t maxBatchSize;
	sbool compress;
	int compressionLevel;	/* Compression level for zlib, default=-1, fastest=1, best=9, none=0*/
	sbool useHttps;
	sbool allowUnsignedCerts;
	sbool skipVerifyHost;
	uchar *caCertFile;
	uchar *myCertFile;
	uchar *myPrivKeyFile;
	sbool reloadOnHup;
	sbool retryFailures;
	unsigned int ratelimitInterval;
	unsigned int ratelimitBurst;
	/* for retries */
	ratelimit_t *ratelimiter;
	uchar *retryRulesetName;
	ruleset_t *retryRuleset;
	struct instanceConf_s *next;
} instanceData;

struct modConfData_s {
	rsconf_t *pConf;		/* our overall config object */
	instanceConf_t *root, *tail;
};
static modConfData_t *loadModConf = NULL;	/* modConf ptr to use for the current load process */

typedef struct wrkrInstanceData {
	PTR_ASSERT_DEF
	instanceData *pData;
	int serverIndex;
	int replyLen;
	char *reply;
	long httpStatusCode;	/* http status code of response */
	CURL	*curlCheckConnHandle;	/* libcurl session handle for checking the server connection */
	CURL	*curlPostHandle;	/* libcurl session handle for posting data to the server */
	HEADER	*curlHeader;	/* json POST request info */
	uchar *restURL;		/* last used URL for error reporting */
	sbool bzInitDone;
	z_stream zstrm; /* zip stream to use for gzip http compression */
	struct {
		uchar **data; /* array of strings, this will be batched up lazily */
		size_t sizeBytes; /* total length of this batch in bytes */
		size_t nmemb;	/* number of messages in batch (for statistics counting) */
	} batch;
	struct {
		uchar *buf;
		size_t curLen;
		size_t len;
	} compressCtx;
} wrkrInstanceData_t;

/* tables for interfacing with the v6 config system */
/* action (instance) parameters */
static struct cnfparamdescr actpdescr[] = {
	{ "server", eCmdHdlrArray, 0 },
	{ "serverport", eCmdHdlrInt, 0 },
	{ "healthchecktimeout", eCmdHdlrInt, 0 },
	{ "httpcontenttype", eCmdHdlrGetWord, 0 },
	{ "httpheaderkey", eCmdHdlrGetWord, 0 },
	{ "httpheadervalue", eCmdHdlrString, 0 },
	{ "httpheaders", eCmdHdlrArray, 0 },
	{ "uid", eCmdHdlrGetWord, 0 },
	{ "pwd", eCmdHdlrGetWord, 0 },
	{ "restpath", eCmdHdlrGetWord, 0 },
	{ "checkpath", eCmdHdlrGetWord, 0 },
	{ "dynrestpath", eCmdHdlrBinary, 0 },
	{ "batch", eCmdHdlrBinary, 0 },
	{ "batch.format", eCmdHdlrGetWord, 0 },
	{ "batch.maxbytes", eCmdHdlrSize, 0 },
	{ "batch.maxsize", eCmdHdlrSize, 0 },
	{ "compress", eCmdHdlrBinary, 0 },
	{ "compress.level", eCmdHdlrInt, 0 },
	{ "usehttps", eCmdHdlrBinary, 0 },
	{ "errorfile", eCmdHdlrGetWord, 0 },
	{ "template", eCmdHdlrGetWord, 0 },
	{ "allowunsignedcerts", eCmdHdlrBinary, 0 },
	{ "skipverifyhost", eCmdHdlrBinary, 0 },
	{ "tls.cacert", eCmdHdlrString, 0 },
	{ "tls.mycert", eCmdHdlrString, 0 },
	{ "tls.myprivkey", eCmdHdlrString, 0 },
	{ "reloadonhup", eCmdHdlrBinary, 0 },
	{ "retry", eCmdHdlrBinary, 0 },
	{ "retry.ruleset", eCmdHdlrString, 0 },
	{ "ratelimit.interval", eCmdHdlrInt, 0 },
	{ "ratelimit.burst", eCmdHdlrInt, 0 },
};
static struct cnfparamblk actpblk =
	{ CNFPARAMBLK_VERSION,
	  sizeof(actpdescr)/sizeof(struct cnfparamdescr),
	  actpdescr
	};

static rsRetVal curlSetup(wrkrInstanceData_t *pWrkrData);
static void curlCleanup(wrkrInstanceData_t *pWrkrData);
static void curlCheckConnSetup(wrkrInstanceData_t *const pWrkrData);

/* compressCtx functions */
static void ATTR_NONNULL()
initCompressCtx(wrkrInstanceData_t *pWrkrData);

static void ATTR_NONNULL()
freeCompressCtx(wrkrInstanceData_t *pWrkrData);

static rsRetVal ATTR_NONNULL()
resetCompressCtx(wrkrInstanceData_t *pWrkrData, size_t len);

static rsRetVal ATTR_NONNULL()
growCompressCtx(wrkrInstanceData_t *pWrkrData, size_t newLen);

static rsRetVal ATTR_NONNULL()
appendCompressCtx(wrkrInstanceData_t *pWrkrData, uchar *srcBuf, size_t srcLen);

BEGINcreateInstance
CODESTARTcreateInstance
	pData->fdErrFile = -1;
	pthread_mutex_init(&pData->mutErrFile, NULL);
	pData->caCertFile = NULL;
	pData->myCertFile = NULL;
	pData->myPrivKeyFile = NULL;
	pData->ratelimiter = NULL;
	pData->retryRulesetName = NULL;
	pData->retryRuleset = NULL;
ENDcreateInstance

BEGINcreateWrkrInstance
uchar **batchData;
CODESTARTcreateWrkrInstance
	PTR_ASSERT_SET_TYPE(pWrkrData, WRKR_DATA_TYPE_ES);
	pWrkrData->curlHeader = NULL;
	pWrkrData->curlPostHandle = NULL;
	pWrkrData->curlCheckConnHandle = NULL;
	pWrkrData->serverIndex = 0;
	pWrkrData->httpStatusCode = 0;
	pWrkrData->restURL = NULL;
	pWrkrData->bzInitDone = 0;
	if(pData->batchMode) {
		pWrkrData->batch.nmemb = 0;
		pWrkrData->batch.sizeBytes = 0;
		batchData = (uchar **) malloc(pData->maxBatchSize * sizeof(uchar *));
		if (batchData == NULL) {
			LogError(0, RS_RET_OUT_OF_MEMORY,
				"omhttp: cannot allocate memory for batch queue turning off batch mode\n");
			pData->batchMode = 0; /* at least it works */
		} else {
			pWrkrData->batch.data = batchData;
		}
	}
	initCompressCtx(pWrkrData);
	iRet = curlSetup(pWrkrData);
ENDcreateWrkrInstance

BEGINisCompatibleWithFeature
CODESTARTisCompatibleWithFeature
	if(eFeat == sFEATURERepeatedMsgReduction)
		iRet = RS_RET_OK;
ENDisCompatibleWithFeature

BEGINfreeInstance
	int i;
CODESTARTfreeInstance
	if(pData->fdErrFile != -1)
		close(pData->fdErrFile);
	pthread_mutex_destroy(&pData->mutErrFile);
	for(i = 0 ; i < pData->numServers ; ++i)
		free(pData->serverBaseUrls[i]);
	free(pData->serverBaseUrls);
	free(pData->uid);
	free(pData->httpcontenttype);
	free(pData->headerContentTypeBuf);
	free(pData->httpheaderkey);
	free(pData->httpheadervalue);
	for(i = 0 ; i < pData->nHttpHeaders ; ++i) {
		free((void*) pData->httpHeaders[i]);
	}
	free(pData->httpHeaders);
	pData->nHttpHeaders = 0;
	free(pData->pwd);
	free(pData->authBuf);
	free(pData->headerBuf);
	free(pData->restPath);
	free(pData->checkPath);
	free(pData->tplName);
	free(pData->errorFile);
	free(pData->caCertFile);
	free(pData->myCertFile);
	free(pData->myPrivKeyFile);
	free(pData->retryRulesetName);
	if (pData->ratelimiter != NULL)
		ratelimitDestruct(pData->ratelimiter);
	if (pData->bFreeBatchFormatName)
		free(pData->batchFormatName);
ENDfreeInstance

BEGINfreeWrkrInstance
CODESTARTfreeWrkrInstance
	curlCleanup(pWrkrData);

	free(pWrkrData->restURL);
	pWrkrData->restURL = NULL;

	free(pWrkrData->batch.data);
	pWrkrData->batch.data = NULL;

	if (pWrkrData->bzInitDone)
		deflateEnd(&pWrkrData->zstrm);
	freeCompressCtx(pWrkrData);

ENDfreeWrkrInstance

BEGINdbgPrintInstInfo
	int i;
CODESTARTdbgPrintInstInfo
	dbgprintf("omhttp\n");
	dbgprintf("\ttemplate='%s'\n", pData->tplName);
	dbgprintf("\tnumServers=%d\n", pData->numServers);
	dbgprintf("\thealthCheckTimeout=%lu\n", pData->healthCheckTimeout);
	dbgprintf("\tserverBaseUrls=");
	for(i = 0 ; i < pData->numServers ; ++i)
		dbgprintf("%c'%s'", i == 0 ? '[' : ' ', pData->serverBaseUrls[i]);
	dbgprintf("]\n");
	dbgprintf("\tdefaultPort=%d\n", pData->defaultPort);
	dbgprintf("\tuid='%s'\n", pData->uid == NULL ? (uchar*)"(not configured)" : pData->uid);
	dbgprintf("\thttpcontenttype='%s'\n", pData->httpcontenttype == NULL ?
		(uchar*)"(not configured)" : pData->httpcontenttype);
	dbgprintf("\thttpheaderkey='%s'\n", pData->httpheaderkey == NULL ?
		(uchar*)"(not configured)" : pData->httpheaderkey);
	dbgprintf("\thttpheadervalue='%s'\n", pData->httpheadervalue == NULL ?
		(uchar*)"(not configured)" : pData->httpheadervalue);
	dbgprintf("\thttpHeaders=[");
	for(i = 0 ; i < pData->nHttpHeaders ; ++i)
		dbgprintf("\t%s\n",pData->httpHeaders[i]);
	dbgprintf("\t]\n");
	dbgprintf("\tpwd=(%sconfigured)\n", pData->pwd == NULL ? "not " : "");
	dbgprintf("\trest path='%s'\n", pData->restPath);
	dbgprintf("\tcheck path='%s'\n", pData->checkPath);
	dbgprintf("\tdynamic rest path=%d\n", pData->dynRestPath);
	dbgprintf("\tuse https=%d\n", pData->useHttps);
	dbgprintf("\tbatch=%d\n", pData->batchMode);
	dbgprintf("\tbatch.format='%s'\n", pData->batchFormatName);
	dbgprintf("\tbatch.maxbytes=%zu\n", pData->maxBatchBytes);
	dbgprintf("\tbatch.maxsize=%zu\n", pData->maxBatchSize);
	dbgprintf("\tcompress=%d\n", pData->compress);
	dbgprintf("\tcompress.level=%d\n", pData->compressionLevel);
	dbgprintf("\tallowUnsignedCerts=%d\n", pData->allowUnsignedCerts);
	dbgprintf("\tskipVerifyHost=%d\n", pData->skipVerifyHost);
	dbgprintf("\terrorfile='%s'\n", pData->errorFile == NULL ?
		(uchar*)"(not configured)" : pData->errorFile);
	dbgprintf("\ttls.cacert='%s'\n", pData->caCertFile);
	dbgprintf("\ttls.mycert='%s'\n", pData->myCertFile);
	dbgprintf("\ttls.myprivkey='%s'\n", pData->myPrivKeyFile);
	dbgprintf("\treloadonhup='%d'\n", pData->reloadOnHup);
	dbgprintf("\tretry='%d'\n", pData->retryFailures);
	dbgprintf("\tretry.ruleset='%s'\n", pData->retryRulesetName);
	dbgprintf("\tratelimit.interval='%u'\n", pData->ratelimitInterval);
	dbgprintf("\tratelimit.burst='%u'\n", pData->ratelimitBurst);
ENDdbgPrintInstInfo


/* http POST result string ... useful for debugging */
static size_t
curlResult(void *ptr, size_t size, size_t nmemb, void *userdata)
{
	char *p = (char *)ptr;
	wrkrInstanceData_t *pWrkrData = (wrkrInstanceData_t*) userdata;
	char *buf;
	size_t newlen;
	PTR_ASSERT_CHK(pWrkrData, WRKR_DATA_TYPE_ES);
	newlen = pWrkrData->replyLen + size*nmemb;
	if((buf = realloc(pWrkrData->reply, newlen + 1)) == NULL) {
		LogError(errno, RS_RET_ERR, "omhttp: realloc failed in curlResult");
		return 0; /* abort due to failure */
	}
	memcpy(buf+pWrkrData->replyLen, p, size*nmemb);
	pWrkrData->replyLen = newlen;
	pWrkrData->reply = buf;
	return size*nmemb;
}

/* Build basic URL part, which includes hostname and port as follows:
 * http://hostname:port/ based on a server param
 * Newly creates a cstr for this purpose.
 * Note: serverParam MUST NOT end in '/' (caller must strip if it exists)
 */
static rsRetVal
computeBaseUrl(const char*const serverParam,
	const int defaultPort,
	const sbool useHttps,
	uchar **baseUrl)
{
#	define SCHEME_HTTPS "https://"
#	define SCHEME_HTTP "http://"

	char portBuf[64];
	int r = 0;
	const char *host = serverParam;
	DEFiRet;

	assert(serverParam[strlen(serverParam)-1] != '/');

	es_str_t *urlBuf = es_newStr(256);
	if (urlBuf == NULL) {
		LogError(0, RS_RET_OUT_OF_MEMORY,
		"omhttp: failed to allocate es_str urlBuf in computeBaseUrl");
		ABORT_FINALIZE(RS_RET_ERR);
	}

	/* Find where the hostname/ip of the server starts. If the scheme is not specified
	 * in the uri, start the buffer with a scheme corresponding to the useHttps parameter.
	 */
	if (strcasestr(serverParam, SCHEME_HTTP))
		host = serverParam + strlen(SCHEME_HTTP);
	else if (strcasestr(serverParam, SCHEME_HTTPS))
		host = serverParam + strlen(SCHEME_HTTPS);
	else
		r = useHttps ? es_addBuf(&urlBuf, SCHEME_HTTPS, sizeof(SCHEME_HTTPS)-1) :
			es_addBuf(&urlBuf, SCHEME_HTTP, sizeof(SCHEME_HTTP)-1);

	if (r == 0) r = es_addBuf(&urlBuf, (char *)serverParam, strlen(serverParam));
	if (r == 0 && !strchr(host, ':')) {
		snprintf(portBuf, sizeof(portBuf), ":%d", defaultPort);
		r = es_addBuf(&urlBuf, portBuf, strlen(portBuf));
	}
	if (r == 0) r = es_addChar(&urlBuf, '/');
	if (r == 0) *baseUrl = (uchar*) es_str2cstr(urlBuf, NULL);

	if (r != 0 || baseUrl == NULL) {
		LogError(0, RS_RET_ERR,
			"omhttp: error occurred computing baseUrl from server %s", serverParam);
		ABORT_FINALIZE(RS_RET_ERR);
	}
finalize_it:
	if (urlBuf) {
		es_deleteStr(urlBuf);
	}
	RETiRet;
}

static inline void
incrementServerIndex(wrkrInstanceData_t *pWrkrData)
{
	pWrkrData->serverIndex = (pWrkrData->serverIndex + 1) % pWrkrData->pData->numServers;
}


/* checks if connection to ES can be established; also iterates over
 * potential servers to support high availability (HA) feature. If it
 * needs to switch server, will record new one in curl handle.
 */
static rsRetVal ATTR_NONNULL()
checkConn(wrkrInstanceData_t *const pWrkrData)
{
	CURL *curl;
	CURLcode res;
	es_str_t *urlBuf = NULL;
	char* healthUrl;
	char* serverUrl;
	char* checkPath;
	int i;
	int r;
	DEFiRet;

	if (pWrkrData->pData->checkPath == NULL) {
		DBGPRINTF("omhttp: checkConn no health check uri configured skipping it\n");
		FINALIZE;
	}

	pWrkrData->reply = NULL;
	pWrkrData->replyLen = 0;
	curl = pWrkrData->curlCheckConnHandle;
	urlBuf = es_newStr(256);
	if (urlBuf == NULL) {
		LogError(0, RS_RET_OUT_OF_MEMORY,
			"omhttp: unable to allocate buffer for health check uri.");
		ABORT_FINALIZE(RS_RET_SUSPENDED);
	}

	for(i = 0; i < pWrkrData->pData->numServers; ++i) {
		serverUrl = (char*) pWrkrData->pData->serverBaseUrls[pWrkrData->serverIndex];
		checkPath = (char*) pWrkrData->pData->checkPath;

		es_emptyStr(urlBuf);
		r = es_addBuf(&urlBuf, serverUrl, strlen(serverUrl));
		if(r == 0 && checkPath != NULL)
			r = es_addBuf(&urlBuf, checkPath, strlen(checkPath));
		if(r == 0)
			healthUrl = es_str2cstr(urlBuf, NULL);
		if(r != 0 || healthUrl == NULL) {
			LogError(0, RS_RET_OUT_OF_MEMORY,
				"omhttp: unable to allocate buffer for health check uri.");
			ABORT_FINALIZE(RS_RET_SUSPENDED);
		}

		curlCheckConnSetup(pWrkrData);
		curl_easy_setopt(curl, CURLOPT_URL, healthUrl);
		res = curl_easy_perform(curl);
		free(healthUrl);

		if (res == CURLE_OK) {
			DBGPRINTF("omhttp: checkConn %s completed with success "
				"on attempt %d\n", serverUrl, i);
			ABORT_FINALIZE(RS_RET_OK);
		}

		DBGPRINTF("omhttp: checkConn %s failed on attempt %d: %s\n",
			serverUrl, i, curl_easy_strerror(res));
		incrementServerIndex(pWrkrData);
	}

	LogMsg(0, RS_RET_SUSPENDED, LOG_WARNING,
		"omhttp: checkConn failed after %d attempts.", i);
	ABORT_FINALIZE(RS_RET_SUSPENDED);

finalize_it:
	if(urlBuf != NULL)
		es_deleteStr(urlBuf);

	free(pWrkrData->reply);
	pWrkrData->reply = NULL; /* don't leave dangling pointer */
	RETiRet;
}


BEGINtryResume
CODESTARTtryResume
	DBGPRINTF("omhttp: tryResume called\n");
	iRet = checkConn(pWrkrData);
ENDtryResume


/* get the current index and type for this message */
static void ATTR_NONNULL(1)
getRestPath(const instanceData *const pData, uchar **const tpls,
		      uchar **const restPath)
{
	*restPath = pData->restPath;
	if(tpls == NULL) {
		goto done;
	}

	int iNumTpls = 1;
	if(pData->dynRestPath) {
		*restPath = tpls[iNumTpls];
		++iNumTpls;
	}

done:
	assert(restPath != NULL);
	return;
}


static rsRetVal ATTR_NONNULL(1)
setPostURL(wrkrInstanceData_t *const pWrkrData, uchar **const tpls)
{
	uchar *restPath;
	char* baseUrl;
	es_str_t *url;
	int r;
	DEFiRet;
	instanceData *const pData = pWrkrData->pData;

	baseUrl = (char*)pData->serverBaseUrls[pWrkrData->serverIndex];
	url = es_newStrFromCStr(baseUrl, strlen(baseUrl));
	if (url == NULL) {
		LogError(0, RS_RET_OUT_OF_MEMORY,
			"omhttp: error allocating new estr for POST url.");
		ABORT_FINALIZE(RS_RET_ERR);
	}

	getRestPath(pData, tpls, &restPath);

	r = 0;
	if (restPath != NULL)
		r = es_addBuf(&url, (char*)restPath, ustrlen(restPath));

	if(r != 0) {
		LogError(0, RS_RET_ERR, "omhttp: failure in creating restURL, "
				"error code: %d", r);
		ABORT_FINALIZE(RS_RET_ERR);
	}

	if(pWrkrData->restURL != NULL)
		free(pWrkrData->restURL);

	pWrkrData->restURL = (uchar*)es_str2cstr(url, NULL);
	curl_easy_setopt(pWrkrData->curlPostHandle, CURLOPT_URL, pWrkrData->restURL);
	DBGPRINTF("omhttp: using REST URL: '%s'\n", pWrkrData->restURL);

finalize_it:
	if (url != NULL)
		es_deleteStr(url);
	RETiRet;
}

/*
 * Dumps entire bulk request and response in error log
 * {
 *	"request": {
 *	 	"url": "https://url.com:443/path",
 *	 	"postdata": "mypayload" }
 *	 "response" : {
 *	 	"status": 400,
 *	 	"response": "error string" }
 * }
 */
static rsRetVal
renderJsonErrorMessage(wrkrInstanceData_t *pWrkrData, uchar *reqmsg, char **rendered)
{
	DEFiRet;
	fjson_object *req = NULL;
	fjson_object *res = NULL;
	fjson_object *errRoot = NULL;

	if ((req = fjson_object_new_object()) == NULL)
		ABORT_FINALIZE(RS_RET_ERR);
	fjson_object_object_add(req, "url", fjson_object_new_string((char *)pWrkrData->restURL));
	fjson_object_object_add(req, "postdata", fjson_object_new_string((char *)reqmsg));

	if ((res = fjson_object_new_object()) == NULL) {
		fjson_object_put(req); // cleanup request object
		ABORT_FINALIZE(RS_RET_ERR);
	}

	#define ERR_MSG_NULL "NULL: curl request failed or no response"
	fjson_object_object_add(res, "status", fjson_object_new_int(pWrkrData->httpStatusCode));
	if (pWrkrData->reply == NULL) {
		fjson_object_object_add(res, "message",
			fjson_object_new_string_len(ERR_MSG_NULL, strlen(ERR_MSG_NULL)));
	} else {
		fjson_object_object_add(res, "message",
			fjson_object_new_string_len(pWrkrData->reply, pWrkrData->replyLen));
	}

	if ((errRoot = fjson_object_new_object()) == NULL) {
		fjson_object_put(req); // cleanup request object
		fjson_object_put(res); // cleanup response object
		ABORT_FINALIZE(RS_RET_ERR);
	}

	fjson_object_object_add(errRoot, "request", req);
	fjson_object_object_add(errRoot, "response", res);

	*rendered = strdup((char *) fjson_object_to_json_string(errRoot));

finalize_it:
	if (errRoot != NULL)
		fjson_object_put(errRoot);

	RETiRet;
}

/* write data error request/replies to separate error file
 * Note: we open the file but never close it before exit. If it
 * needs to be closed, HUP must be sent.
 */
static rsRetVal ATTR_NONNULL()
writeDataError(wrkrInstanceData_t *const pWrkrData,
	instanceData *const pData, uchar *const reqmsg)
{
	char *rendered = NULL;
	size_t toWrite;
	ssize_t wrRet;
	sbool bMutLocked = 0;

	DEFiRet;

	if(pData->errorFile == NULL) {
		DBGPRINTF("omhttp: no local error logger defined - "
			"ignoring REST error information\n");
		FINALIZE;
	}

	pthread_mutex_lock(&pData->mutErrFile);
	bMutLocked = 1;

	CHKiRet(renderJsonErrorMessage(pWrkrData, reqmsg, &rendered));

	if(pData->fdErrFile == -1) {
		pData->fdErrFile = open((char*)pData->errorFile,
					O_WRONLY|O_CREAT|O_APPEND|O_LARGEFILE|O_CLOEXEC,
					S_IRUSR|S_IWUSR|S_IRGRP|S_IWGRP);
		if(pData->fdErrFile == -1) {
			LogError(errno, RS_RET_ERR, "omhttp: error opening error file %s",
				pData->errorFile);
			ABORT_FINALIZE(RS_RET_ERR);
		}
	}

	/* we do not do real error-handling on the err file, as this finally complicates
	 * things way to much.
	 */
	DBGPRINTF("omhttp: error record: '%s'\n", rendered);
	toWrite = strlen(rendered) + 1;
	/* Note: we overwrite the '\0' terminator with '\n' -- so we avoid
	 * caling malloc() -- write() does NOT need '\0'!
	 */
	rendered[toWrite-1] = '\n'; /* NO LONGER A STRING! */
	wrRet = write(pData->fdErrFile, rendered, toWrite);
	if(wrRet != (ssize_t) toWrite) {
		LogError(errno, RS_RET_IO_ERROR,
			"omhttp: error writing error file %s, write returned %lld",
			pData->errorFile, (long long) wrRet);
	}

finalize_it:
	if(bMutLocked)
		pthread_mutex_unlock(&pData->mutErrFile);
	free(rendered);
	RETiRet;
}

static rsRetVal
queueBatchOnRetryRuleset(wrkrInstanceData_t *const pWrkrData, instanceData *const pData)
{
	uchar *msgData;
	smsg_t *pMsg;
	DEFiRet;

	if (pData->retryRuleset == NULL) {
		LogError(0, RS_RET_ERR, "omhttp: queueBatchOnRetryRuleset invalid call with a NULL retryRuleset");
		ABORT_FINALIZE(RS_RET_ERR);
	}

	for (size_t i = 0; i < pWrkrData->batch.nmemb; i++) {
		msgData = pWrkrData->batch.data[i];
		DBGPRINTF("omhttp: queueBatchOnRetryRuleset putting message '%s' into retry ruleset '%s'\n",
			msgData, pData->retryRulesetName);

		// Construct the message object
		CHKiRet(msgConstruct(&pMsg));
		CHKiRet(MsgSetFlowControlType(pMsg, eFLOWCTL_FULL_DELAY));
		MsgSetInputName(pMsg, pInputName);
		MsgSetRawMsg(pMsg, (const char *)msgData, ustrlen(msgData));
		MsgSetMSGoffs(pMsg, 0); // No header
		MsgSetTAG(pMsg, (const uchar *)"omhttp-retry", 12);

		// And place it on the retry ruleset
		MsgSetRuleset(pMsg, pData->retryRuleset);
		ratelimitAddMsg(pData->ratelimiter, NULL, pMsg);

		// Count here in case not entire batch succeeds
		STATSCOUNTER_INC(ctrMessagesRetry, mutCtrMessagesRetry);
	}
finalize_it:
	RETiRet;
}

static rsRetVal
checkResult(wrkrInstanceData_t *pWrkrData, uchar *reqmsg)
{
	instanceData *pData;
	long statusCode;
	size_t numMessages;
	DEFiRet;

	pData = pWrkrData->pData;
	statusCode = pWrkrData->httpStatusCode;

	if (pData->batchMode) {
		numMessages = pWrkrData->batch.nmemb;
	} else {
		numMessages = 1;
	}

	// 500+ errors return RS_RET_SUSPENDED if NOT batchMode and should be retried
	// status 0 is the default and the request failed for some reason, retry this too
	// 400-499 are malformed input and should not be retried just logged instead
	if (statusCode == 0) {
		// request failed, suspend or retry
		STATSCOUNTER_ADD(ctrMessagesFail, mutCtrMessagesFail, numMessages);
		iRet = RS_RET_SUSPENDED;
	} else if (statusCode >= 500) {
		// server error, suspend or retry
		STATSCOUNTER_INC(ctrHttpStatusFail, mutCtrHttpStatusFail);
		STATSCOUNTER_ADD(ctrMessagesFail, mutCtrMessagesFail, numMessages);
		iRet = RS_RET_SUSPENDED;
	} else if (statusCode >= 300) {
		// redirection or client error, NO suspend nor retry
		STATSCOUNTER_INC(ctrHttpStatusFail, mutCtrHttpStatusFail);
		STATSCOUNTER_ADD(ctrMessagesFail, mutCtrMessagesFail, numMessages);
		iRet = RS_RET_DATAFAIL;
	} else {
		// success, normal state
		// includes 2XX (success like 200-OK)
		// includes 1XX (informational like 100-Continue)
		STATSCOUNTER_INC(ctrHttpStatusSuccess, mutCtrHttpStatusSuccess);
		STATSCOUNTER_ADD(ctrMessagesSuccess, mutCtrMessagesSuccess, numMessages);
		iRet = RS_RET_OK;
	}

	if (iRet != RS_RET_OK) {
		LogMsg(0, iRet, LOG_ERR, "omhttp: checkResult error http status code: %ld reply: %s",
			statusCode, pWrkrData->reply != NULL ? pWrkrData->reply : "NULL");

		writeDataError(pWrkrData, pWrkrData->pData, reqmsg);

		if (iRet == RS_RET_DATAFAIL)
			ABORT_FINALIZE(iRet);

		if (pData->batchMode && pData->maxBatchSize > 1) {
			// Write each message back to retry ruleset if configured
			if (pData->retryFailures && pData->retryRuleset != NULL) {
				// Retry stats counted inside this function call
				iRet = queueBatchOnRetryRuleset(pWrkrData, pData);
				if (iRet != RS_RET_OK) {
					LogMsg(0, iRet, LOG_ERR,
						"omhttp: checkResult error while queueing to retry ruleset"
						"some messages may be lost");
				}
			}
			iRet = RS_RET_OK; // We've done all we can tell rsyslog to carry on
		}
	}

finalize_it:
	RETiRet;
}

/* Compress a buffer before sending using zlib. Based on code from tools/omfwd.c
 * Initialize the zstrm object for gzip compression, using this init function.
 * deflateInit2(z_stream strm, int level, int method,
 *                             int windowBits, int memLevel, int strategy);
 * strm: the zlib stream held in pWrkrData
 * level: the compression level held in pData
 * method: the operation constant Z_DEFLATED
 * windowBits: the size of the compression window 15 = log_2(32768)
 *     to configure as gzip add 16 to windowBits (w | 16) for final value 31
 * memLevel: the memory optimization level 8 is default)
 * strategy: using Z_DEFAULT_STRATEGY is default
 */
static rsRetVal
compressHttpPayload(wrkrInstanceData_t *pWrkrData, uchar *message, unsigned len)
{
	int zRet;
	unsigned outavail;
	uchar zipBuf[32*1024];

	DEFiRet;

	if (!pWrkrData->bzInitDone) {
		pWrkrData->zstrm.zalloc = Z_NULL;
		pWrkrData->zstrm.zfree = Z_NULL;
		pWrkrData->zstrm.opaque = Z_NULL;
		zRet = deflateInit2(&pWrkrData->zstrm, pWrkrData->pData->compressionLevel,
			Z_DEFLATED, 31, 8, Z_DEFAULT_STRATEGY);
		if (zRet != Z_OK) {
			DBGPRINTF("omhttp: compressHttpPayload error %d returned from zlib/deflateInit2()\n", zRet);
			ABORT_FINALIZE(RS_RET_ZLIB_ERR);
		}
		pWrkrData->bzInitDone = 1;
	}

	CHKiRet(resetCompressCtx(pWrkrData, len));

	/* now doing the compression */
	pWrkrData->zstrm.next_in = (Bytef*) message;
	pWrkrData->zstrm.avail_in = len;
	/* run deflate() on buffer until everything has been compressed */
	do {
		DBGPRINTF("omhttp: compressHttpPayload in deflate() loop, avail_in %d, total_in %ld\n",
				pWrkrData->zstrm.avail_in, pWrkrData->zstrm.total_in);
		pWrkrData->zstrm.avail_out = sizeof(zipBuf);
		pWrkrData->zstrm.next_out = zipBuf;

		zRet = deflate(&pWrkrData->zstrm, Z_NO_FLUSH);
		DBGPRINTF("omhttp: compressHttpPayload after deflate, ret %d, avail_out %d\n",
				zRet, pWrkrData->zstrm.avail_out);
		if (zRet != Z_OK)
			ABORT_FINALIZE(RS_RET_ZLIB_ERR);
		outavail = sizeof(zipBuf) - pWrkrData->zstrm.avail_out;
		if (outavail != 0)
			CHKiRet(appendCompressCtx(pWrkrData, zipBuf, outavail));

	} while (pWrkrData->zstrm.avail_out == 0);

	/* run deflate again with Z_FINISH with no new input */
	pWrkrData->zstrm.avail_in = 0;
	do {
		pWrkrData->zstrm.avail_out = sizeof(zipBuf);
		pWrkrData->zstrm.next_out = zipBuf;
		deflate(&pWrkrData->zstrm, Z_FINISH); /* returns Z_STREAM_END == 1 */
		outavail = sizeof(zipBuf) - pWrkrData->zstrm.avail_out;
		if (outavail != 0)
			CHKiRet(appendCompressCtx(pWrkrData, zipBuf, outavail));

	} while (pWrkrData->zstrm.avail_out == 0);

finalize_it:
	if (pWrkrData->bzInitDone)
		deflateEnd(&pWrkrData->zstrm);
	pWrkrData->bzInitDone = 0;
	RETiRet;

}

static void ATTR_NONNULL()
initCompressCtx(wrkrInstanceData_t *pWrkrData)
{
	pWrkrData->compressCtx.buf = NULL;
	pWrkrData->compressCtx.curLen = 0;
	pWrkrData->compressCtx.len = 0;
}

static void ATTR_NONNULL()
freeCompressCtx(wrkrInstanceData_t *pWrkrData)
{
	if (pWrkrData->compressCtx.buf != NULL) {
		free(pWrkrData->compressCtx.buf);
		pWrkrData->compressCtx.buf = NULL;
	}
}


static rsRetVal ATTR_NONNULL()
resetCompressCtx(wrkrInstanceData_t *pWrkrData, size_t len)
{
	DEFiRet;
	pWrkrData->compressCtx.curLen = 0;
	pWrkrData->compressCtx.len = len;
	CHKiRet(growCompressCtx(pWrkrData, len));

finalize_it:
	if (iRet != RS_RET_OK)
		freeCompressCtx(pWrkrData);
	RETiRet;
}

static rsRetVal ATTR_NONNULL()
growCompressCtx(wrkrInstanceData_t *pWrkrData, size_t newLen)
{
	DEFiRet;
	if (pWrkrData->compressCtx.buf == NULL) {
		CHKmalloc(pWrkrData->compressCtx.buf = (uchar *)malloc(sizeof(uchar)*newLen));
	} else {
		uchar *const newbuf = (uchar *)realloc(pWrkrData->compressCtx.buf, sizeof(uchar)*newLen);
		CHKmalloc(newbuf);
		pWrkrData->compressCtx.buf = newbuf;
	}
	pWrkrData->compressCtx.len = newLen;
finalize_it:
	RETiRet;

}

static rsRetVal ATTR_NONNULL()
appendCompressCtx(wrkrInstanceData_t *pWrkrData, uchar *srcBuf, size_t srcLen)
{
	size_t newLen;
	DEFiRet;
	newLen = pWrkrData->compressCtx.curLen + srcLen;
	if (newLen > pWrkrData->compressCtx.len)
		CHKiRet(growCompressCtx(pWrkrData, newLen));

	memcpy(pWrkrData->compressCtx.buf + pWrkrData->compressCtx.curLen,
		srcBuf, srcLen);
	pWrkrData->compressCtx.curLen = newLen;
finalize_it:
	if (iRet != RS_RET_OK)
		freeCompressCtx(pWrkrData);
	RETiRet;
}

/* Some duplicate code to curlSetup, but we need to add the gzip content-encoding
 * header at runtime, and if the compression fails, we do not want to send it.
 * Additionally, the curlCheckConnHandle should not be configured with a gzip header.
 */
static rsRetVal ATTR_NONNULL()
buildCurlHeaders(wrkrInstanceData_t *pWrkrData, sbool contentEncodeGzip)
{
	struct curl_slist *slist = NULL;

	DEFiRet;

	if (pWrkrData->pData->httpcontenttype != NULL) {
		// If content type specified use it, otherwise use a sane default
		slist = curl_slist_append(slist, (char *)pWrkrData->pData->headerContentTypeBuf);
	} else {
		if (pWrkrData->pData->batchMode) {
			// If in batch mode, use the approprate content type header for the format,
			// defaulting to text/plain with newline
			switch (pWrkrData->pData->batchFormat) {
				case FMT_JSONARRAY:
					slist = curl_slist_append(slist, HTTP_HEADER_CONTENT_JSON);
					break;
				case FMT_KAFKAREST:
					slist = curl_slist_append(slist, HTTP_HEADER_CONTENT_KAFKA);
					break;
				case FMT_NEWLINE:
					slist = curl_slist_append(slist, HTTP_HEADER_CONTENT_TEXT);
					break;
				case FMT_LOKIREST:
					slist = curl_slist_append(slist, HTTP_HEADER_CONTENT_JSON);
					break;
				default:
					slist = curl_slist_append(slist, HTTP_HEADER_CONTENT_TEXT);
			}
		} else {
			// Otherwise non batch, presume most users are sending JSON
			slist = curl_slist_append(slist, HTTP_HEADER_CONTENT_JSON);
		}
	}

	CHKmalloc(slist);

	// Configured headers..
	if (pWrkrData->pData->headerBuf != NULL) {
		slist = curl_slist_append(slist, (char *)pWrkrData->pData->headerBuf);
		CHKmalloc(slist);
	}

	for (int k = 0 ; k < pWrkrData->pData->nHttpHeaders; k++) {
		slist = curl_slist_append(slist, (char *)pWrkrData->pData->httpHeaders[k]);
		CHKmalloc(slist);
	}

	// When sending more than 1Kb, libcurl automatically sends an Except: 100-Continue header
	// and will wait 1s for a response, could make this configurable but for now disable
	slist = curl_slist_append(slist, HTTP_HEADER_EXPECT_EMPTY);
	CHKmalloc(slist);

	if (contentEncodeGzip) {
		slist = curl_slist_append(slist, HTTP_HEADER_ENCODING_GZIP);
		CHKmalloc(slist);
	}

	if (pWrkrData->curlHeader != NULL)
		curl_slist_free_all(pWrkrData->curlHeader);

	pWrkrData->curlHeader = slist;

finalize_it:
	if (iRet != RS_RET_OK) {
		curl_slist_free_all(slist);
		LogError(0, iRet, "omhttp: error allocating curl header slist, using previous one");
	}
	RETiRet;
}



static rsRetVal ATTR_NONNULL(1, 2)
curlPost(wrkrInstanceData_t *pWrkrData, uchar *message, int msglen, uchar **tpls,
		const int nmsgs __attribute__((unused)))
{
	CURLcode curlCode;
	CURL *const curl = pWrkrData->curlPostHandle;
	char errbuf[CURL_ERROR_SIZE] = "";

	char *postData;
	int postLen;
	sbool compressed;
	DEFiRet;

	PTR_ASSERT_SET_TYPE(pWrkrData, WRKR_DATA_TYPE_ES);

	if(pWrkrData->pData->numServers > 1) {
		/* needs to be called to support ES HA feature */
		CHKiRet(checkConn(pWrkrData));
	}
	CHKiRet(setPostURL(pWrkrData, tpls));

	pWrkrData->reply = NULL;
	pWrkrData->replyLen = 0;
	pWrkrData->httpStatusCode = 0;

	postData = (char *)message;
	postLen = msglen;
	compressed = 0;

	if (pWrkrData->pData->compress) {
		iRet = compressHttpPayload(pWrkrData, message, msglen);
		if (iRet != RS_RET_OK) {
			LogError(0, iRet, "omhttp: curlPost error while compressing, will default to uncompressed");
		} else {
			postData = (char *)pWrkrData->compressCtx.buf;
			postLen = pWrkrData->compressCtx.curLen;
			compressed = 1;
			DBGPRINTF("omhttp: curlPost compressed %d to %d bytes\n", msglen, postLen);
		}
	}

	buildCurlHeaders(pWrkrData, compressed);

	curl_easy_setopt(curl, CURLOPT_POSTFIELDS, postData);
	curl_easy_setopt(curl, CURLOPT_POSTFIELDSIZE, postLen);
	curl_easy_setopt(pWrkrData->curlPostHandle, CURLOPT_HTTPHEADER, pWrkrData->curlHeader);
	curl_easy_setopt(curl, CURLOPT_ERRORBUFFER, errbuf);

	curlCode = curl_easy_perform(curl);
	DBGPRINTF("omhttp: curlPost curl returned %lld\n", (long long) curlCode);
	STATSCOUNTER_INC(ctrHttpRequestCount, mutCtrHttpRequestCount);

	if (curlCode != CURLE_OK) {
		STATSCOUNTER_INC(ctrHttpRequestFail, mutCtrHttpRequestFail);
		LogError(0, RS_RET_SUSPENDED,
			"omhttp: suspending ourselves due to server failure %lld: %s",
			(long long) curlCode, errbuf);
		// Check the result here too and retry if needed, then we should suspend
		// Usually in batch mode we clobber any iRet values, but probably not a great
		// idea to keep hitting a dead server. The http status code will be 0 at this point.
		checkResult(pWrkrData, message);
		ABORT_FINALIZE(RS_RET_SUSPENDED);
	} else {
		STATSCOUNTER_INC(ctrHttpRequestSuccess, mutCtrHttpRequestSuccess);
	}

	// Grab the HTTP Response code
	curl_easy_getinfo(curl, CURLINFO_RESPONSE_CODE, &pWrkrData->httpStatusCode);
	if(pWrkrData->reply == NULL) {
		DBGPRINTF("omhttp: curlPost pWrkrData reply==NULL, replyLen = '%d'\n",
			pWrkrData->replyLen);
	} else {
		DBGPRINTF("omhttp: curlPost pWrkrData replyLen = '%d'\n", pWrkrData->replyLen);
		if(pWrkrData->replyLen > 0) {
			pWrkrData->reply[pWrkrData->replyLen] = '\0';
			/* Append 0 Byte if replyLen is above 0 - byte has been reserved in malloc */
		}
		//TODO: replyLen++? because 0 Byte is appended
		DBGPRINTF("omhttp: curlPost pWrkrData reply: '%s'\n", pWrkrData->reply);
	}
	CHKiRet(checkResult(pWrkrData, message));

finalize_it:
	incrementServerIndex(pWrkrData);
	if (pWrkrData->reply != NULL) {
		free(pWrkrData->reply);
		pWrkrData->reply = NULL; /* don't leave dangling pointer */
	}
	RETiRet;
}

/* Build a JSON batch that conforms to the Kafka Rest Proxy format.
 * See https://docs.confluent.io/current/kafka-rest/docs/quickstart.html for more info.
 * Want {"records": [{"value": "message1"}, {"value": "message2"}]}
 */
static rsRetVal
serializeBatchKafkaRest(wrkrInstanceData_t *pWrkrData, char **batchBuf)
{
	fjson_object *batchArray = NULL;
	fjson_object *recordObj = NULL;
	fjson_object *valueObj = NULL;
	fjson_object *msgObj = NULL;

	size_t numMessages = pWrkrData->batch.nmemb;
	size_t sizeTotal = pWrkrData->batch.sizeBytes + numMessages + 1; // messages + brackets + commas
	DBGPRINTF("omhttp: serializeBatchKafkaRest numMessages=%zd sizeTotal=%zd\n", numMessages, sizeTotal);

	DEFiRet;

	batchArray = fjson_object_new_array();
	if (batchArray == NULL) {
		LogError(0, RS_RET_ERR, "omhttp: serializeBatchKafkaRest failed to create array");
		ABORT_FINALIZE(RS_RET_ERR);
	}

	for (size_t i = 0; i < numMessages; i++) {
		valueObj = fjson_object_new_object();
		if (valueObj == NULL) {
			fjson_object_put(batchArray); // cleanup
			LogError(0, RS_RET_ERR, "omhttp: serializeBatchKafkaRest failed to create value object");
			ABORT_FINALIZE(RS_RET_ERR);
		}

		msgObj = fjson_tokener_parse((char *) pWrkrData->batch.data[i]);
		if (msgObj == NULL) {
			LogError(0, NO_ERRCODE,
				"omhttp: serializeBatchKafkaRest failed to parse %s as json ignoring it",
				pWrkrData->batch.data[i]);
			continue;
		}
		fjson_object_object_add(valueObj, "value", msgObj);
		fjson_object_array_add(batchArray, valueObj);
	}

	recordObj = fjson_object_new_object();
	if (recordObj == NULL) {
		fjson_object_put(batchArray); // cleanup
		LogError(0, RS_RET_ERR, "omhttp: serializeBatchKafkaRest failed to create record object");
		ABORT_FINALIZE(RS_RET_ERR);
	}

	fjson_object_object_add(recordObj, "records", batchArray);

	const char *batchString = fjson_object_to_json_string_ext(recordObj, FJSON_TO_STRING_PLAIN);
	*batchBuf = strndup(batchString, strlen(batchString));

finalize_it:
	if (recordObj != NULL) {
		fjson_object_put(recordObj);
		recordObj = NULL;
	}

	RETiRet;
}

static rsRetVal
serializeBatchLokiRest(wrkrInstanceData_t *pWrkrData, char **batchBuf)
{
	fjson_object *batchArray = NULL;
	fjson_object *recordObj = NULL;
	fjson_object *valueObj = NULL;
	fjson_object *msgObj = NULL;

	size_t numMessages = pWrkrData->batch.nmemb;
	size_t sizeTotal = pWrkrData->batch.sizeBytes + numMessages + 1; // messages + brackets + commas
	DBGPRINTF("omhttp: serializeBatchLokiRest numMessages=%zd sizeTotal=%zd\n", numMessages, sizeTotal);

	DEFiRet;

	batchArray = fjson_object_new_array();
	if (batchArray == NULL) {
		LogError(0, RS_RET_ERR, "omhttp: serializeBatchLokiRest failed to create array");
		ABORT_FINALIZE(RS_RET_ERR);
	}

	for (size_t i = 0; i < numMessages; i++) {
		valueObj = fjson_object_new_object();
		if (valueObj == NULL) {
			fjson_object_put(batchArray); // cleanup
			LogError(0, RS_RET_ERR, "omhttp: serializeBatchLokiRest failed to create value object");
			ABORT_FINALIZE(RS_RET_ERR);
		}
		DBGPRINTF("omhttp: serializeBatchLokiRest parsing message [%s]\n",(char *) pWrkrData->batch.data[i]);
		msgObj = fjson_tokener_parse((char *) pWrkrData->batch.data[i]);
		if (msgObj == NULL) {
			LogError(0, NO_ERRCODE,
				"omhttp: serializeBatchLokiRest failed to parse %s as json ignoring it",
				pWrkrData->batch.data[i]);
			continue;
		}
		fjson_object_array_add(batchArray, msgObj);
	}

	recordObj = fjson_object_new_object();
	if (recordObj == NULL) {
		fjson_object_put(batchArray); // cleanup
		LogError(0, RS_RET_ERR, "omhttp: serializeBatchLokiRest failed to create record object");
		ABORT_FINALIZE(RS_RET_ERR);
	}

	fjson_object_object_add(recordObj, "streams", batchArray);

	const char *batchString = fjson_object_to_json_string_ext(recordObj, FJSON_TO_STRING_PLAIN);
	*batchBuf = strndup(batchString, strlen(batchString));

finalize_it:
	if (recordObj != NULL) {
		fjson_object_put(recordObj);
		recordObj = NULL;
	}

	RETiRet;
}
/* Build a JSON batch by placing each element in an array.
 */
static rsRetVal
serializeBatchJsonArray(wrkrInstanceData_t *pWrkrData, char **batchBuf)
{
	fjson_object *batchArray = NULL;
	fjson_object *msgObj = NULL;
	size_t numMessages = pWrkrData->batch.nmemb;
	size_t sizeTotal = pWrkrData->batch.sizeBytes + numMessages + 1; // messages + brackets + commas
	DBGPRINTF("omhttp: serializeBatchJsonArray numMessages=%zd sizeTotal=%zd\n", numMessages, sizeTotal);

	DEFiRet;

	batchArray = fjson_object_new_array();
	if (batchArray == NULL) {
		LogError(0, RS_RET_ERR, "omhttp: serializeBatchJsonArray failed to create array");
		ABORT_FINALIZE(RS_RET_ERR);
	}

	for (size_t i = 0; i < numMessages; i++) {
		msgObj = fjson_tokener_parse((char *) pWrkrData->batch.data[i]);
		if (msgObj == NULL) {
			LogError(0, NO_ERRCODE,
				"omhttp: serializeBatchJsonArray failed to parse %s as json, ignoring it",
				pWrkrData->batch.data[i]);
			continue;
		}
		fjson_object_array_add(batchArray, msgObj);
	}

	const char *batchString = fjson_object_to_json_string_ext(batchArray, FJSON_TO_STRING_PLAIN);
	*batchBuf = strndup(batchString, strlen(batchString));

finalize_it:
	if (batchArray != NULL) {
		fjson_object_put(batchArray);
		batchArray = NULL;
	}
	RETiRet;
}

/* Build a batch by joining each element with a newline character.
 */
static rsRetVal
serializeBatchNewline(wrkrInstanceData_t *pWrkrData, char **batchBuf)
{
	DEFiRet;
	size_t numMessages = pWrkrData->batch.nmemb;
	size_t sizeTotal = pWrkrData->batch.sizeBytes + numMessages; // message + newline + null term
	int r = 0;

	DBGPRINTF("omhttp: serializeBatchNewline numMessages=%zd sizeTotal=%zd\n", numMessages, sizeTotal);

	es_str_t *batchString = es_newStr(1024);

	if (batchString == NULL)
		ABORT_FINALIZE(RS_RET_ERR);

	for (size_t i = 0; i < numMessages; i++) {
		size_t nToCopy = ustrlen(pWrkrData->batch.data[i]);
		if (r == 0) r = es_addBuf(&batchString, (char *)pWrkrData->batch.data[i], nToCopy);
		if (i == numMessages - 1) break;
		if (r == 0) r = es_addChar(&batchString, '\n');
	}

	if (r == 0) *batchBuf = (char *) es_str2cstr(batchString, NULL);

	if (r != 0 || *batchBuf== NULL) {
		LogError(0, RS_RET_ERR, "omhttp: serializeBatchNewline failed to build batch string");
		ABORT_FINALIZE(RS_RET_ERR);
	}

finalize_it:
	if (batchString != NULL)
		es_deleteStr(batchString);

	RETiRet;
}

/* Return the final batch size in bytes for each serialization method.
 * Used to decide if a batch should be flushed early.
 */
static size_t
computeBatchSize(wrkrInstanceData_t *pWrkrData)
{
	size_t extraBytes = 0;
	size_t sizeBytes = pWrkrData->batch.sizeBytes;
	size_t numMessages = pWrkrData->batch.nmemb;

	switch (pWrkrData->pData->batchFormat) {
		case FMT_JSONARRAY:
			// square brackets, commas between each message
			// 2 + numMessages - 1 = numMessages + 1
			extraBytes = numMessages > 0 ? numMessages + 1 : 2;
			break;
		case FMT_KAFKAREST:
			// '{}', '[]', '"records":'= 2 + 2 + 10 = 14
			// '{"value":}' for each message = n * 10
			// numMessages == 0 handled implicitly in multiplication
			extraBytes = (numMessages * 10) + 14;
			break;
		case FMT_NEWLINE:
			// newlines between each message
			extraBytes = numMessages > 0 ? numMessages - 1 : 0;
			break;
		case FMT_LOKIREST:
			// {"streams":[ '{}', '[]', '"streams":' = 14
			//    {"stream": {key:value}..., "values":[[timestamp: msg1]]},
			//    {"stream": {key:value}..., "values":[[timestamp: msg2]]}
			// ]}
			// message (11) * numMessages + header ( 16 )
			extraBytes = (numMessages * 2) + 14;
			break;
		default:
			// newlines between each message
			extraBytes = numMessages > 0 ? numMessages - 1 : 0;
	}

	return sizeBytes + extraBytes + 1; // plus a null
}

static void ATTR_NONNULL()
initializeBatch(wrkrInstanceData_t *pWrkrData)
{
	pWrkrData->batch.sizeBytes = 0;
	pWrkrData->batch.nmemb = 0;
}

/* Adds a message to this worker's batch
 */
static rsRetVal
buildBatch(wrkrInstanceData_t *pWrkrData, uchar *message)
{
	DEFiRet;

	if (pWrkrData->batch.nmemb >= pWrkrData->pData->maxBatchSize) {
		LogError(0, RS_RET_ERR, "omhttp: buildBatch something has gone wrong,"
			"number of messages in batch is bigger than the max batch size, bailing");
		ABORT_FINALIZE(RS_RET_ERR);
	}
	pWrkrData->batch.data[pWrkrData->batch.nmemb] = message;
	pWrkrData->batch.sizeBytes += strlen((char *)message);
	pWrkrData->batch.nmemb++;

finalize_it:
	RETiRet;
}

static rsRetVal
submitBatch(wrkrInstanceData_t *pWrkrData)
{
	DEFiRet;
	char *batchBuf = NULL;

	switch (pWrkrData->pData->batchFormat) {
		case FMT_JSONARRAY:
			iRet = serializeBatchJsonArray(pWrkrData, &batchBuf);
			break;
		case FMT_KAFKAREST:
			iRet = serializeBatchKafkaRest(pWrkrData, &batchBuf);
			break;
		case FMT_LOKIREST:
			iRet = serializeBatchLokiRest(pWrkrData, &batchBuf);
			break;
		case FMT_NEWLINE:
			iRet = serializeBatchNewline(pWrkrData, &batchBuf);
			break;
		default:
			iRet = serializeBatchNewline(pWrkrData, &batchBuf);
	}

	if (iRet != RS_RET_OK || batchBuf == NULL)
		ABORT_FINALIZE(iRet);

	DBGPRINTF("omhttp: submitBatch, batch: '%s'\n", batchBuf);

	CHKiRet(curlPost(pWrkrData, (uchar*) batchBuf, strlen(batchBuf),
		NULL, pWrkrData->batch.nmemb));

finalize_it:
	if (batchBuf != NULL)
		free(batchBuf);
	RETiRet;
}

BEGINbeginTransaction
CODESTARTbeginTransaction
	if(!pWrkrData->pData->batchMode) {
		FINALIZE;
	}

	initializeBatch(pWrkrData);
finalize_it:
ENDbeginTransaction

BEGINdoAction
size_t nBytes;
sbool submit;
CODESTARTdoAction

	STATSCOUNTER_INC(ctrMessagesSubmitted, mutCtrMessagesSubmitted);

	if (pWrkrData->pData->batchMode) {
		/* If the maxbatchsize is 1, then build and immediately post a batch with 1 element.
		 * This mode will play nicely with rsyslog's action.resumeRetryCount logic.
		 */
		if (pWrkrData->pData->maxBatchSize == 1) {
			initializeBatch(pWrkrData);
			CHKiRet(buildBatch(pWrkrData, ppString[0]));
			CHKiRet(submitBatch(pWrkrData));
			FINALIZE;
		}

		/* We should submit if any of these conditions are true
		 * 1. Total batch size > pWrkrData->pData->maxBatchSize
		 * 2. Total bytes > pWrkrData->pData->maxBatchBytes
		 */
		nBytes = ustrlen((char *)ppString[0]) - 1 ;
		submit = 0;

		if (pWrkrData->batch.nmemb >= pWrkrData->pData->maxBatchSize) {
			submit = 1;
			DBGPRINTF("omhttp: maxbatchsize limit reached submitting batch of %zd elements.\n",
				pWrkrData->batch.nmemb);
		} else if (computeBatchSize(pWrkrData) + nBytes > pWrkrData->pData->maxBatchBytes) {
			submit = 1;
			DBGPRINTF("omhttp: maxbytes limit reached submitting partial batch of %zd elements.\n",
				pWrkrData->batch.nmemb);
		}

		if (submit) {
			CHKiRet(submitBatch(pWrkrData));
			initializeBatch(pWrkrData);
		}

		CHKiRet(buildBatch(pWrkrData, ppString[0]));

		/* If there is only one item in the batch, all previous items have been
	 	 * submitted or this is the first item for this transaction. Return previous
		 * committed so that all items leading up to the current (exclusive)
		 * are not replayed should a failure occur anywhere else in the transaction. */
		iRet = pWrkrData->batch.nmemb == 1 ? RS_RET_PREVIOUS_COMMITTED : RS_RET_DEFER_COMMIT;
	} else {
		CHKiRet(curlPost(pWrkrData, ppString[0], strlen((char*)ppString[0]), ppString, 1));
	}
finalize_it:
ENDdoAction


BEGINendTransaction
CODESTARTendTransaction
	/* End Transaction only if batch data is not empty */
	if (pWrkrData->batch.nmemb > 0) {
		CHKiRet(submitBatch(pWrkrData));
	} else {
		dbgprintf("omhttp: endTransaction, pWrkrData->batch.nmemb = 0, "
			"nothing to send. \n");
	}
finalize_it:
ENDendTransaction

/* Creates authentication header uid:pwd
 */
static rsRetVal
computeAuthHeader(char* uid, char* pwd, uchar** authBuf)
{
	int r;
	DEFiRet;

	es_str_t* auth = es_newStr(1024);
	if (auth == NULL) {
		LogError(0, RS_RET_OUT_OF_MEMORY,
			"omhttp: failed to allocate es_str auth for auth header construction");
		ABORT_FINALIZE(RS_RET_ERR);
	}

	r = es_addBuf(&auth, uid, strlen(uid));
	if(r == 0) r = es_addChar(&auth, ':');
	if(r == 0 && pwd != NULL) r = es_addBuf(&auth, pwd, strlen(pwd));
	if(r == 0) *authBuf = (uchar*) es_str2cstr(auth, NULL);

	if (r != 0 || *authBuf == NULL) {
		LogError(0, RS_RET_ERR, "omhttp: failed to build auth header\n");
		ABORT_FINALIZE(RS_RET_ERR);
	}

finalize_it:
	if (auth != NULL)
		es_deleteStr(auth);
	RETiRet;
}

static rsRetVal
computeApiHeader(char* key, char* value, uchar** headerBuf)
{
	int r;
	DEFiRet;

	es_str_t* header = es_newStr(10240);
	if (header == NULL) {
		LogError(0, RS_RET_OUT_OF_MEMORY,
		"omhttp: failed to allocate es_str auth for api header construction");
		ABORT_FINALIZE(RS_RET_ERR);
	}

	r = es_addBuf(&header, key, strlen(key));
	if(r == 0) r = es_addChar(&header, ':');
	if(r == 0) r = es_addChar(&header, ' ');
	if(r == 0 && value != NULL) r = es_addBuf(&header, value, strlen(value));
	if(r == 0) *headerBuf = (uchar*) es_str2cstr(header, NULL);

	if (r != 0 || *headerBuf == NULL) {
		LogError(0, RS_RET_ERR, "omhttp: failed to build http header\n");
		ABORT_FINALIZE(RS_RET_ERR);
	}

finalize_it:
	if (header != NULL)
		es_deleteStr(header);
	RETiRet;
}

static void ATTR_NONNULL()
curlSetupCommon(wrkrInstanceData_t *const pWrkrData, CURL *const handle)
{
	PTR_ASSERT_SET_TYPE(pWrkrData, WRKR_DATA_TYPE_ES);
	curl_easy_setopt(handle, CURLOPT_HTTPHEADER, pWrkrData->curlHeader);
	curl_easy_setopt(handle, CURLOPT_NOSIGNAL, TRUE);
	curl_easy_setopt(handle, CURLOPT_WRITEFUNCTION, curlResult);
	curl_easy_setopt(handle, CURLOPT_WRITEDATA, pWrkrData);
	if(pWrkrData->pData->allowUnsignedCerts)
		curl_easy_setopt(handle, CURLOPT_SSL_VERIFYPEER, FALSE);
	if(pWrkrData->pData->skipVerifyHost)
		curl_easy_setopt(handle, CURLOPT_SSL_VERIFYHOST, FALSE);
	if(pWrkrData->pData->authBuf != NULL) {
		curl_easy_setopt(handle, CURLOPT_USERPWD, pWrkrData->pData->authBuf);
		curl_easy_setopt(handle, CURLOPT_PROXYAUTH, CURLAUTH_ANY);
	}
	if(pWrkrData->pData->caCertFile)
		curl_easy_setopt(handle, CURLOPT_CAINFO, pWrkrData->pData->caCertFile);
	if(pWrkrData->pData->myCertFile)
		curl_easy_setopt(handle, CURLOPT_SSLCERT, pWrkrData->pData->myCertFile);
	if(pWrkrData->pData->myPrivKeyFile)
		curl_easy_setopt(handle, CURLOPT_SSLKEY, pWrkrData->pData->myPrivKeyFile);
	/* uncomment for in-dept debuggung:
	curl_easy_setopt(handle, CURLOPT_VERBOSE, TRUE); */
}

static void ATTR_NONNULL()
curlCheckConnSetup(wrkrInstanceData_t *const pWrkrData)
{
	PTR_ASSERT_SET_TYPE(pWrkrData, WRKR_DATA_TYPE_ES);
	curlSetupCommon(pWrkrData, pWrkrData->curlCheckConnHandle);
	curl_easy_setopt(pWrkrData->curlCheckConnHandle,
		CURLOPT_TIMEOUT_MS, pWrkrData->pData->healthCheckTimeout);
}

static void ATTR_NONNULL(1)
curlPostSetup(wrkrInstanceData_t *const pWrkrData)
{
	PTR_ASSERT_SET_TYPE(pWrkrData, WRKR_DATA_TYPE_ES);
	curlSetupCommon(pWrkrData, pWrkrData->curlPostHandle);
	curl_easy_setopt(pWrkrData->curlPostHandle, CURLOPT_POST, 1);
	CURLcode cRet;
	/* Enable TCP keep-alive for this transfer */
	cRet = curl_easy_setopt(pWrkrData->curlPostHandle, CURLOPT_TCP_KEEPALIVE, 1L);
	if (cRet != CURLE_OK)
		DBGPRINTF("omhttp: curlPostSetup unknown option CURLOPT_TCP_KEEPALIVE\n");
	/* keep-alive idle time to 120 seconds */
	cRet = curl_easy_setopt(pWrkrData->curlPostHandle, CURLOPT_TCP_KEEPIDLE, 120L);
	if (cRet != CURLE_OK)
		DBGPRINTF("omhttp: curlPostSetup unknown option CURLOPT_TCP_KEEPIDLE\n");
	/* interval time between keep-alive probes: 60 seconds */
	cRet = curl_easy_setopt(pWrkrData->curlPostHandle, CURLOPT_TCP_KEEPINTVL, 60L);
	if (cRet != CURLE_OK)
		DBGPRINTF("omhttp: curlPostSetup unknown option CURLOPT_TCP_KEEPINTVL\n");
}

static rsRetVal ATTR_NONNULL()
curlSetup(wrkrInstanceData_t *const pWrkrData)
{
	struct curl_slist *slist = NULL;

	DEFiRet;
	if (pWrkrData->pData->httpcontenttype != NULL) {
		slist = curl_slist_append(slist, (char *)pWrkrData->pData->headerContentTypeBuf);
	} else {
		slist = curl_slist_append(slist, HTTP_HEADER_CONTENT_JSON);
	}

	if (pWrkrData->pData->headerBuf != NULL) {
		slist = curl_slist_append(slist, (char *)pWrkrData->pData->headerBuf);
		CHKmalloc(slist);
	}

	for (int k = 0 ; k < pWrkrData->pData->nHttpHeaders; k++) {
		slist = curl_slist_append(slist, (char *)pWrkrData->pData->httpHeaders[k]);
		CHKmalloc(slist);
	}

	// When sending more than 1Kb, libcurl automatically sends an Except: 100-Continue header
	// and will wait 1s for a response, could make this configurable but for now disable
	slist = curl_slist_append(slist, HTTP_HEADER_EXPECT_EMPTY);
	pWrkrData->curlHeader = slist;
	CHKmalloc(pWrkrData->curlPostHandle = curl_easy_init());
	curlPostSetup(pWrkrData);

	CHKmalloc(pWrkrData->curlCheckConnHandle = curl_easy_init());
	curlCheckConnSetup(pWrkrData);

finalize_it:
	if(iRet != RS_RET_OK && pWrkrData->curlPostHandle != NULL) {
		curl_easy_cleanup(pWrkrData->curlPostHandle);
		pWrkrData->curlPostHandle = NULL;
	}
	RETiRet;
}

static void ATTR_NONNULL()
curlCleanup(wrkrInstanceData_t *const pWrkrData)
{
	if (pWrkrData->curlHeader != NULL) {
		curl_slist_free_all(pWrkrData->curlHeader);
		pWrkrData->curlHeader = NULL;
	}
	if (pWrkrData->curlCheckConnHandle != NULL) {
		curl_easy_cleanup(pWrkrData->curlCheckConnHandle);
		pWrkrData->curlCheckConnHandle = NULL;
	}
	if (pWrkrData->curlPostHandle != NULL) {
		curl_easy_cleanup(pWrkrData->curlPostHandle);
		pWrkrData->curlPostHandle = NULL;
	}
}

static void ATTR_NONNULL()
setInstParamDefaults(instanceData *const pData)
{
	pData->serverBaseUrls = NULL;
	pData->defaultPort = 443;
	pData->healthCheckTimeout = 3500;
	pData->uid = NULL;
	pData->httpcontenttype = NULL;
	pData->headerContentTypeBuf = NULL;
	pData->httpheaderkey = NULL;
	pData->httpheadervalue = NULL;
	pData->httpHeaders = NULL;
	pData->nHttpHeaders = 0;
	pData->pwd = NULL;
	pData->authBuf = NULL;
	pData->restPath = NULL;
	pData->checkPath = NULL;
	pData->dynRestPath = 0;
	pData->batchMode = 0;
	pData->batchFormatName = (uchar *)"newline";
	pData->batchFormat = FMT_NEWLINE;
	pData->bFreeBatchFormatName = 0;
	pData->useHttps = 1;
	pData->maxBatchBytes = 10485760; //i.e. 10 MB Is the default max message size for AWS API Gateway
	pData->maxBatchSize = 100; // 100 messages
	pData->compress = 0; // off
	pData->compressionLevel = -1; // default compression
	pData->allowUnsignedCerts = 0;
	pData->skipVerifyHost = 0;
	pData->tplName = NULL;
	pData->errorFile = NULL;
	pData->caCertFile = NULL;
	pData->myCertFile = NULL;
	pData->myPrivKeyFile = NULL;
	pData->reloadOnHup= 0;
	pData->retryFailures = 0;
	pData->ratelimitBurst = 20000;
	pData->ratelimitInterval = 600;
	pData->ratelimiter = NULL;
	pData->retryRulesetName = NULL;
	pData->retryRuleset = NULL;
}

static rsRetVal
checkHeaderParam(char *const param)
{
	DEFiRet;
	char *val = strstr(param, ":");
	if(val == NULL) {
		LogError(0, RS_RET_PARAM_ERROR, "missing ':' delimiter in "
				"parameter '%s'", param);
		ABORT_FINALIZE(RS_RET_PARAM_ERROR);
	}
finalize_it:
	RETiRet;
}

BEGINnewActInst
	struct cnfparamvals *pvals;
	char* serverParam = NULL;
	struct cnfarray* servers = NULL;
	int i;
	int iNumTpls;
	FILE *fp;
	char errStr[1024];
	char *batchFormatName;
	int compressionLevel = -1;
CODESTARTnewActInst
	if((pvals = nvlstGetParams(lst, &actpblk, NULL)) == NULL) {
		ABORT_FINALIZE(RS_RET_MISSING_CNFPARAMS);
	}

	CHKiRet(createInstance(&pData));
	setInstParamDefaults(pData);

	for(i = 0 ; i < actpblk.nParams ; ++i) {
		if(!pvals[i].bUsed)
			continue;
		if(!strcmp(actpblk.descr[i].name, "server")) {
			servers = pvals[i].val.d.ar;
		} else if(!strcmp(actpblk.descr[i].name, "errorfile")) {
			pData->errorFile = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL);
		} else if(!strcmp(actpblk.descr[i].name, "serverport")) {
			pData->defaultPort = (int) pvals[i].val.d.n;
		} else if(!strcmp(actpblk.descr[i].name, "healthchecktimeout")) {
			pData->healthCheckTimeout = (long) pvals[i].val.d.n;
		} else if(!strcmp(actpblk.descr[i].name, "uid")) {
			pData->uid = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL);
		} else if(!strcmp(actpblk.descr[i].name, "httpcontenttype")) {
			pData->httpcontenttype = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL);
		} else if(!strcmp(actpblk.descr[i].name, "httpheaderkey")) {
			pData->httpheaderkey = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL);
		} else if(!strcmp(actpblk.descr[i].name, "httpheadervalue")) {
			pData->httpheadervalue = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL);
		} else if(!strcmp(actpblk.descr[i].name, "httpheaders")) {
			pData->nHttpHeaders = pvals[i].val.d.ar->nmemb;
			CHKmalloc(pData->httpHeaders = malloc(sizeof(uchar *) * pvals[i].val.d.ar->nmemb ));
			for(int j = 0 ; j <  pvals[i].val.d.ar->nmemb ; ++j) {
				char *cstr = es_str2cstr(pvals[i].val.d.ar->arr[j], NULL);
				CHKiRet(checkHeaderParam(cstr));
				pData->httpHeaders[j] = (uchar *)cstr;
			}
		} else if(!strcmp(actpblk.descr[i].name, "pwd")) {
			pData->pwd = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL);
		} else if(!strcmp(actpblk.descr[i].name, "restpath")) {
			pData->restPath = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL);
		} else if(!strcmp(actpblk.descr[i].name, "checkpath")) {
			pData->checkPath = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL);
		} else if(!strcmp(actpblk.descr[i].name, "dynrestpath")) {
			pData->dynRestPath = pvals[i].val.d.n;
		} else if(!strcmp(actpblk.descr[i].name, "batch")) {
			pData->batchMode = pvals[i].val.d.n;
		} else if(!strcmp(actpblk.descr[i].name, "batch.format")) {
			batchFormatName = es_str2cstr(pvals[i].val.d.estr, NULL);
			if (strstr(VALID_BATCH_FORMATS, batchFormatName) != NULL) {
				pData->batchFormatName = (uchar *)batchFormatName;
				pData->bFreeBatchFormatName = 1;
				if (!strcmp(batchFormatName, "newline")) {
					pData->batchFormat = FMT_NEWLINE;
				} else if (!strcmp(batchFormatName, "jsonarray")) {
					pData->batchFormat = FMT_JSONARRAY;
				} else if (!strcmp(batchFormatName, "kafkarest")) {
					pData->batchFormat = FMT_KAFKAREST;
				} else if (!strcmp(batchFormatName, "lokirest")) {
					pData->batchFormat = FMT_LOKIREST;
				}
			} else {
				LogError(0, NO_ERRCODE, "error: 'batch.format' %s unknown defaulting to 'newline'",
					batchFormatName);
			}
		} else if(!strcmp(actpblk.descr[i].name, "batch.maxbytes")) {
			pData->maxBatchBytes = (size_t) pvals[i].val.d.n;
		} else if(!strcmp(actpblk.descr[i].name, "batch.maxsize")) {
			pData->maxBatchSize = (size_t) pvals[i].val.d.n;
		} else if(!strcmp(actpblk.descr[i].name, "compress")) {
			pData->compress = pvals[i].val.d.n;
		} else if(!strcmp(actpblk.descr[i].name, "compress.level")) {
			compressionLevel = pvals[i].val.d.n;
			if (compressionLevel == -1 || (compressionLevel >= 0 && compressionLevel < 10)) {
				pData->compressionLevel = compressionLevel;
			} else {
				LogError(0, NO_ERRCODE, "omhttp: invalid compress.level %d using default instead,"
					"valid levels are -1 and 0-9",
					compressionLevel);
			}
		} else if(!strcmp(actpblk.descr[i].name, "allowunsignedcerts")) {
			pData->allowUnsignedCerts = pvals[i].val.d.n;
		} else if(!strcmp(actpblk.descr[i].name, "skipverifyhost")) {
			pData->skipVerifyHost = pvals[i].val.d.n;
		} else if(!strcmp(actpblk.descr[i].name, "usehttps")) {
			pData->useHttps = pvals[i].val.d.n;
		} else if(!strcmp(actpblk.descr[i].name, "template")) {
			pData->tplName = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL);
		} else if(!strcmp(actpblk.descr[i].name, "tls.cacert")) {
			pData->caCertFile = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL);
			fp = fopen((const char*)pData->caCertFile, "r");
			if(fp == NULL) {
				rs_strerror_r(errno, errStr, sizeof(errStr));
				LogError(0, RS_RET_NO_FILE_ACCESS,
						"error: 'tls.cacert' file %s couldn't be accessed: %s\n",
						pData->caCertFile, errStr);
			} else {
				fclose(fp);
			}
		} else if(!strcmp(actpblk.descr[i].name, "tls.mycert")) {
			pData->myCertFile = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL);
			fp = fopen((const char*)pData->myCertFile, "r");
			if(fp == NULL) {
				rs_strerror_r(errno, errStr, sizeof(errStr));
				LogError(0, RS_RET_NO_FILE_ACCESS,
						"error: 'tls.mycert' file %s couldn't be accessed: %s\n",
						pData->myCertFile, errStr);
			} else {
				fclose(fp);
			}
		} else if(!strcmp(actpblk.descr[i].name, "tls.myprivkey")) {
			pData->myPrivKeyFile = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL);
			fp = fopen((const char*)pData->myPrivKeyFile, "r");
			if(fp == NULL) {
				rs_strerror_r(errno, errStr, sizeof(errStr));
				LogError(0, RS_RET_NO_FILE_ACCESS,
						"error: 'tls.myprivkey' file %s couldn't be accessed: %s\n",
						pData->myPrivKeyFile, errStr);
			} else {
				fclose(fp);
			}
		} else if(!strcmp(actpblk.descr[i].name, "reloadonhup")) {
			pData->reloadOnHup= pvals[i].val.d.n;
		} else if(!strcmp(actpblk.descr[i].name, "retry")) {
			pData->retryFailures = pvals[i].val.d.n;
		} else if(!strcmp(actpblk.descr[i].name, "retry.ruleset")) {
			pData->retryRulesetName = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL);
		} else if(!strcmp(actpblk.descr[i].name, "ratelimit.burst")) {
			pData->ratelimitBurst = (unsigned int) pvals[i].val.d.n;
		} else if(!strcmp(actpblk.descr[i].name, "ratelimit.interval")) {
			pData->ratelimitInterval = (unsigned int) pvals[i].val.d.n;
		} else {
			LogError(0, RS_RET_INTERNAL_ERROR, "omhttp: program error, "
				"non-handled param '%s'", actpblk.descr[i].name);
		}
	}

	if(pData->pwd != NULL && pData->uid == NULL) {
		LogError(0, RS_RET_UID_MISSING,
			"omhttp: password is provided, but no uid "
			"- action definition invalid");
		ABORT_FINALIZE(RS_RET_UID_MISSING);
	}
	if(pData->httpheaderkey != NULL && pData->httpheadervalue == NULL) {
		LogError(0, RS_RET_UID_MISSING,
			"omhttp: http header key is provided, but no http header value "
			"- action definition invalid");
		ABORT_FINALIZE(RS_RET_UID_MISSING);
	}
	if(pData->dynRestPath && pData->restPath == NULL) {
		LogError(0, RS_RET_CONFIG_ERROR,
			"omhttp: requested dynamic rest path, but no name for rest "
			"path template given - action definition invalid");
		ABORT_FINALIZE(RS_RET_CONFIG_ERROR);
	}

	if (pData->uid != NULL)
		CHKiRet(computeAuthHeader((char*) pData->uid, (char*) pData->pwd, &pData->authBuf));
	if (pData->httpcontenttype != NULL)
		CHKiRet(computeApiHeader((char*) "Content-Type",
				(char*) pData->httpcontenttype, &pData->headerContentTypeBuf));

	if (pData->httpheaderkey != NULL)
		CHKiRet(computeApiHeader((char*) pData->httpheaderkey,
				(char*) pData->httpheadervalue, &pData->headerBuf));

	iNumTpls = 1;
	if(pData->dynRestPath) ++iNumTpls;
	DBGPRINTF("omhttp: requesting %d templates\n", iNumTpls);
	CODE_STD_STRING_REQUESTnewActInst(iNumTpls)

	CHKiRet(OMSRsetEntry(*ppOMSR, 0, (uchar*)strdup((pData->tplName == NULL) ?
					    " StdJSONFmt" : (char*)pData->tplName),
		OMSR_NO_RQD_TPL_OPTS));


	/* we need to request additional templates. If we have a dynamic search index,
	 * it will always be string 1. Type may be 1 or 2, depending on whether search
	 * index is dynamic as well. Rule needs to be followed throughout the module.
	 */
	iNumTpls = 1;
	if(pData->dynRestPath) {
		CHKiRet(OMSRsetEntry(*ppOMSR, iNumTpls, ustrdup(pData->restPath),
			OMSR_NO_RQD_TPL_OPTS));
		++iNumTpls;
	}

	if (servers != NULL) {
		pData->numServers = servers->nmemb;
		pData->serverBaseUrls = malloc(servers->nmemb * sizeof(uchar*));
		if (pData->serverBaseUrls == NULL) {
			LogError(0, RS_RET_ERR, "omhttp: unable to allocate buffer "
					"for http server configuration.");
			ABORT_FINALIZE(RS_RET_ERR);
		}

		for(i = 0 ; i < servers->nmemb ; ++i) {
			serverParam = es_str2cstr(servers->arr[i], NULL);
			if (serverParam == NULL) {
				LogError(0, RS_RET_ERR, "omhttp: unable to allocate buffer "
					"for http server configuration.");
				ABORT_FINALIZE(RS_RET_ERR);
			}
			/* Remove a trailing slash if it exists */
			const size_t serverParamLastChar = strlen(serverParam)-1;
			if (serverParam[serverParamLastChar] == '/') {
				serverParam[serverParamLastChar] = '\0';
			}
			CHKiRet(computeBaseUrl(serverParam, pData->defaultPort, pData->useHttps,
				pData->serverBaseUrls + i));
			free(serverParam);
			serverParam = NULL;
		}
	} else {
		LogMsg(0, RS_RET_OK, LOG_WARNING,
			"omhttp: No servers specified, using localhost");
		pData->numServers = 1;
		pData->serverBaseUrls = malloc(sizeof(uchar*));
		if (pData->serverBaseUrls == NULL) {
			LogError(0, RS_RET_ERR, "omhttp: unable to allocate buffer "
					"for http server configuration.");
			ABORT_FINALIZE(RS_RET_ERR);
		}
		CHKiRet(computeBaseUrl("localhost", pData->defaultPort, pData->useHttps, pData->serverBaseUrls));
	}

	if (pData->retryFailures) {
		CHKiRet(ratelimitNew(&pData->ratelimiter, "omhttp", NULL));
		ratelimitSetLinuxLike(pData->ratelimiter, pData->ratelimitInterval, pData->ratelimitBurst);
		ratelimitSetNoTimeCache(pData->ratelimiter);
	}

	/* node created, let's add to list of instance configs for the module */
	if(loadModConf->tail == NULL) {
		loadModConf->tail = loadModConf->root = pData;
	} else {
		loadModConf->tail->next = pData;
		loadModConf->tail = pData;
	}

CODE_STD_FINALIZERnewActInst
	cnfparamvalsDestruct(pvals, &actpblk);
	if (serverParam)
		free(serverParam);
ENDnewActInst


BEGINbeginCnfLoad
CODESTARTbeginCnfLoad
	loadModConf = pModConf;
	pModConf->pConf = pConf;
	pModConf->root = pModConf->tail = NULL;
ENDbeginCnfLoad


BEGINendCnfLoad
CODESTARTendCnfLoad
	loadModConf = NULL; /* done loading */
ENDendCnfLoad


BEGINcheckCnf
	instanceConf_t *inst;
CODESTARTcheckCnf
	for(inst = pModConf->root ; inst != NULL ; inst = inst->next) {
		ruleset_t *pRuleset;
		rsRetVal localRet;

		if (inst->retryRulesetName) {
			localRet = ruleset.GetRuleset(pModConf->pConf, &pRuleset, inst->retryRulesetName);
			if(localRet == RS_RET_NOT_FOUND) {
				LogError(0, localRet, "omhttp: retry.ruleset '%s' not found - "
						"no retry ruleset will be used", inst->retryRulesetName);
			} else {
				inst->retryRuleset = pRuleset;
			}
		}
	}
ENDcheckCnf


BEGINactivateCnf
CODESTARTactivateCnf
ENDactivateCnf


BEGINfreeCnf
CODESTARTfreeCnf
ENDfreeCnf


// HUP handling for the instance...
BEGINdoHUP
CODESTARTdoHUP
	pthread_mutex_lock(&pData->mutErrFile);
	if (pData->fdErrFile != -1) {
		close(pData->fdErrFile);
		pData->fdErrFile = -1;
	}
	pthread_mutex_unlock(&pData->mutErrFile);
ENDdoHUP


// HUP handling for the worker...
BEGINdoHUPWrkr
CODESTARTdoHUPWrkr
	if (pWrkrData->pData->reloadOnHup) {
		LogMsg(0, NO_ERRCODE, LOG_INFO, "omhttp: received HUP reloading curl handles");
		curlCleanup(pWrkrData);
		CHKiRet(curlSetup(pWrkrData));
	}
finalize_it:
ENDdoHUPWrkr


BEGINmodExit
CODESTARTmodExit
	if(pInputName != NULL)
		prop.Destruct(&pInputName);
	curl_global_cleanup();
	objRelease(prop, CORE_COMPONENT);
	objRelease(ruleset, CORE_COMPONENT);
	objRelease(statsobj, CORE_COMPONENT);
	statsobj.Destruct(&httpStats);
ENDmodExit

NO_LEGACY_CONF_parseSelectorAct

BEGINqueryEtryPt
CODESTARTqueryEtryPt
CODEqueryEtryPt_STD_OMOD_QUERIES
CODEqueryEtryPt_STD_OMOD8_QUERIES
CODEqueryEtryPt_IsCompatibleWithFeature_IF_OMOD_QUERIES
CODEqueryEtryPt_STD_CONF2_OMOD_QUERIES
CODEqueryEtryPt_doHUP
CODEqueryEtryPt_doHUPWrkr /* Load the worker HUP handling code */
CODEqueryEtryPt_TXIF_OMOD_QUERIES /* we support the transactional interface! */
CODEqueryEtryPt_STD_CONF2_QUERIES
ENDqueryEtryPt


BEGINmodInit()
CODESTARTmodInit
	*ipIFVersProvided = CURR_MOD_IF_VERSION; /* we only support the current interface specification */
CODEmodInit_QueryRegCFSLineHdlr
	CHKiRet(objUse(prop, CORE_COMPONENT));
	CHKiRet(objUse(ruleset, CORE_COMPONENT));
	CHKiRet(objUse(statsobj, CORE_COMPONENT));

	CHKiRet(statsobj.Construct(&httpStats));
	CHKiRet(statsobj.SetName(httpStats, (uchar *)"omhttp"));
	CHKiRet(statsobj.SetOrigin(httpStats, (uchar*)"omhttp"));

	STATSCOUNTER_INIT(ctrMessagesSubmitted, mutCtrMessagesSubmitted);
	CHKiRet(statsobj.AddCounter(httpStats, (uchar *)"messages.submitted",
			ctrType_IntCtr, CTR_FLAG_RESETTABLE, &ctrMessagesSubmitted));

	STATSCOUNTER_INIT(ctrMessagesSuccess, mutCtrMessagesSuccess);
	CHKiRet(statsobj.AddCounter(httpStats, (uchar *)"messages.success",
			ctrType_IntCtr, CTR_FLAG_RESETTABLE, &ctrMessagesSuccess));

	STATSCOUNTER_INIT(ctrMessagesFail, mutCtrMessagesFail);
	CHKiRet(statsobj.AddCounter(httpStats, (uchar *)"messages.fail",
			ctrType_IntCtr, CTR_FLAG_RESETTABLE, &ctrMessagesFail));

	STATSCOUNTER_INIT(ctrMessagesRetry, mutCtrMessagesRetry);
	CHKiRet(statsobj.AddCounter(httpStats, (uchar *)"messages.retry",
			ctrType_IntCtr, CTR_FLAG_RESETTABLE, &ctrMessagesRetry));

	STATSCOUNTER_INIT(ctrHttpRequestCount, mutCtrHttpRequestCount);
	CHKiRet(statsobj.AddCounter(httpStats, (uchar *)"request.count",
			ctrType_IntCtr, CTR_FLAG_RESETTABLE, &ctrHttpRequestCount));

	STATSCOUNTER_INIT(ctrHttpRequestSuccess, mutCtrHttpRequestSuccess);
	CHKiRet(statsobj.AddCounter(httpStats, (uchar *)"request.success",
			ctrType_IntCtr, CTR_FLAG_RESETTABLE, &ctrHttpRequestSuccess));

	STATSCOUNTER_INIT(ctrHttpRequestFail, mutCtrHttpRequestFail);
	CHKiRet(statsobj.AddCounter(httpStats, (uchar *)"request.fail",
			ctrType_IntCtr, CTR_FLAG_RESETTABLE, &ctrHttpRequestFail));

	STATSCOUNTER_INIT(ctrHttpStatusSuccess, mutCtrHttpStatusSuccess);
	CHKiRet(statsobj.AddCounter(httpStats, (uchar *)"request.status.success",
			ctrType_IntCtr, CTR_FLAG_RESETTABLE, &ctrHttpStatusSuccess));

	STATSCOUNTER_INIT(ctrHttpStatusFail, mutCtrHttpStatusFail);
	CHKiRet(statsobj.AddCounter(httpStats, (uchar *)"request.status.fail",
			ctrType_IntCtr, CTR_FLAG_RESETTABLE, &ctrHttpStatusFail));

	CHKiRet(statsobj.ConstructFinalize(httpStats));

	if (curl_global_init(CURL_GLOBAL_ALL) != 0) {
		LogError(0, RS_RET_OBJ_CREATION_FAILED, "CURL fail. -http disabled");
		ABORT_FINALIZE(RS_RET_OBJ_CREATION_FAILED);
	}

	CHKiRet(prop.Construct(&pInputName));
	CHKiRet(prop.SetString(pInputName, UCHAR_CONSTANT("omhttp"), sizeof("omhttp") - 1));
	CHKiRet(prop.ConstructFinalize(pInputName));
ENDmodInit

/* vi:set ai:
 */
